package com.atguigu.flink.sql.helloworld;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Created by Smexy on 2023/11/20

    表的定义和表的计算时，可以使用sql。
        其他部分，比如环境的定义，还需要代码。
 */
public class Demo2_SQL
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //基于流的环境创建表的环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        //把流转换为Table
        Table table = tableEnv.fromDataStream(ds);

        //为表起一个名字
        tableEnv.createTemporaryView("t1",table);

        String sql = " select * from t1 where id = 's1'";

        Table result = tableEnv.sqlQuery(sql);

        //提交执行运算，打印结果
        result.execute().print();

        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
